/*
 * Copyright (C) 2016 Square, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package okhttp3.internal.cache2;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;

import okio.Buffer;
import okio.ByteString;
import okio.Source;
import okio.Timeout;

import static okhttp3.internal.Util.closeQuietly;

/**
 * Replicates a single upstream source into multiple downstream sources. Each downstream source
 * returns the same bytes as the upstream source. Downstream sources may read data either as it
 * is returned by upstream, or after the upstream source has been exhausted.
 * <p>
 * <p>As bytes are returned from upstream they are written to a local file. Downstream sources read
 * from this file as necessary.
 * <p>
 * <p>This class also keeps a small buffer of bytes recently read from upstream. This is intended to
 * save a small amount of file I/O and data copying.
 */
// TODO(jwilson): what to do about timeouts? They could be different and unfortunately when any
//     timeout is hit we like to tear down the whole stream.
final class Relay {
    private static final int SOURCE_UPSTREAM = 1;
    private static final int SOURCE_FILE = 2;

    static final ByteString PREFIX_CLEAN = ByteString.encodeUtf8("OkHttp cache v1\n");
    static final ByteString PREFIX_DIRTY = ByteString.encodeUtf8("OkHttp DIRTY :(\n");
    private static final long FILE_HEADER_SIZE = 32L;

    /**
     * Read/write persistence of the upstream source and its metadata. Its layout is as follows:
     * <p>
     * <ul>
     * <li>16 bytes: either {@code OkHttp cache v1\n} if the persisted file is complete. This is
     * another sequence of bytes if the file is incomplete and should not be used.
     * <li>8 bytes: <i>n</i>: upstream data size
     * <li>8 bytes: <i>m</i>: metadata size
     * <li><i>n</i> bytes: upstream data
     * <li><i>m</i> bytes: metadata
     * </ul>
     * <p>
     * <p>This is closed and assigned to null when the last source is closed and no further sources
     * are permitted.
     */
    RandomAccessFile file;

    /**
     * The thread that currently has access to upstream. Possibly null. Guarded by this.
     */
    Thread upstreamReader;

    /**
     * Null once the file has a complete copy of the upstream bytes. Only the {@code upstreamReader}
     * thread may access this source.
     */
    Source upstream;

    /**
     * A buffer for {@code upstreamReader} to use when pulling bytes from upstream. Only the {@code
     * upstreamReader} thread may access this buffer.
     */
    final Buffer upstreamBuffer = new Buffer();

    /**
     * The number of bytes consumed from {@link #upstream}. Guarded by this.
     */
    long upstreamPos;

    /**
     * True if there are no further bytes to read from {@code upstream}. Guarded by this.
     */
    boolean complete;

    /**
     * User-supplied additional data persisted with the source data.
     */
    private final ByteString metadata;

    /**
     * The most recently read bytes from {@link #upstream}. This is a suffix of {@link #file}. Guarded
     * by this.
     */
    final Buffer buffer = new Buffer();

    /**
     * The maximum size of {@code buffer}.
     */
    final long bufferMaxSize;

    /**
     * Reference count of the number of active sources reading this stream. When decremented to 0
     * resources are released and all following calls to {@link #newSource} return null. Guarded by
     * this.
     */
    int sourceCount;

    private Relay(RandomAccessFile file, Source upstream, long upstreamPos, ByteString metadata,
                  long bufferMaxSize) {
        this.file = file;
        this.upstream = upstream;
        this.complete = upstream == null;
        this.upstreamPos = upstreamPos;
        this.metadata = metadata;
        this.bufferMaxSize = bufferMaxSize;
    }

    /**
     * Creates a new relay that reads a live stream from {@code upstream}, using {@code file} to share
     * that data with other sources.
     * <p>
     * <p><strong>Warning:</strong> callers to this method must immediately call {@link #newSource} to
     * create a source and close that when they're done. Otherwise a handle to {@code file} will be
     * leaked.
     */
    public static Relay edit(
            File file, Source upstream, ByteString metadata, long bufferMaxSize) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
        Relay result = new Relay(randomAccessFile, upstream, 0L, metadata, bufferMaxSize);

        // Write a dirty header. That way if we crash we won't attempt to recover this.
        randomAccessFile.setLength(0L);
        result.writeHeader(PREFIX_DIRTY, -1L, -1L);

        return result;
    }

    /**
     * Creates a relay that reads a recorded stream from {@code file}.
     * <p>
     * <p><strong>Warning:</strong> callers to this method must immediately call {@link #newSource} to
     * create a source and close that when they're done. Otherwise a handle to {@code file} will be
     * leaked.
     */
    public static Relay read(File file) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
        FileOperator fileOperator = new FileOperator(randomAccessFile.getChannel());

        // Read the header.
        Buffer header = new Buffer();
        fileOperator.read(0, header, FILE_HEADER_SIZE);
        ByteString prefix = header.readByteString(PREFIX_CLEAN.size());
        if (!prefix.equals(PREFIX_CLEAN)) throw new IOException("unreadable cache file");
        long upstreamSize = header.readLong();
        long metadataSize = header.readLong();

        // Read the metadata.
        Buffer metadataBuffer = new Buffer();
        fileOperator.read(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadataSize);
        ByteString metadata = metadataBuffer.readByteString();

        // Return the result.
        return new Relay(randomAccessFile, null, upstreamSize, metadata, 0L);
    }

    private void writeHeader(
            ByteString prefix, long upstreamSize, long metadataSize) throws IOException {
        Buffer header = new Buffer();
        header.write(prefix);
        header.writeLong(upstreamSize);
        header.writeLong(metadataSize);
        if (header.size() != FILE_HEADER_SIZE) throw new IllegalArgumentException();

        FileOperator fileOperator = new FileOperator(file.getChannel());
        fileOperator.write(0, header, FILE_HEADER_SIZE);
    }

    private void writeMetadata(long upstreamSize) throws IOException {
        Buffer metadataBuffer = new Buffer();
        metadataBuffer.write(metadata);

        FileOperator fileOperator = new FileOperator(file.getChannel());
        fileOperator.write(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadata.size());
    }

    void commit(long upstreamSize) throws IOException {
        // Write metadata to the end of the file.
        writeMetadata(upstreamSize);
        file.getChannel().force(false);

        // Once everything else is in place we can swap the dirty header for a clean one.
        writeHeader(PREFIX_CLEAN, upstreamSize, metadata.size());
        file.getChannel().force(false);

        // This file is complete.
        synchronized (Relay.this) {
            complete = true;
        }

        closeQuietly(upstream);
        upstream = null;
    }

    boolean isClosed() {
        return file == null;
    }

    public ByteString metadata() {
        return metadata;
    }

    /**
     * Returns a new source that returns the same bytes as upstream. Returns null if this relay has
     * been closed and no further sources are possible. In that case callers should retry after
     * building a new relay with {@link #read}.
     */
    public Source newSource() {
        synchronized (Relay.this) {
            if (file == null) return null;
            sourceCount++;
        }

        return new RelaySource();
    }

    class RelaySource implements Source {
        private final Timeout timeout = new Timeout();

        /**
         * The operator to read and write the shared file. Null if this source is closed.
         */
        private FileOperator fileOperator = new FileOperator(file.getChannel());

        /**
         * The next byte to read. This is always less than or equal to {@code upstreamPos}.
         */
        private long sourcePos;

        /**
         * Selects where to find the bytes for a read and read them. This is one of three sources.
         * <p>
         * <h3>Upstream:</h3>
         * In this case the current thread is assigned as the upstream reader. We read bytes from
         * upstream and copy them to both the file and to the buffer. Finally we release the upstream
         * reader lock and return the new bytes.
         * <p>
         * <h3>The file</h3>
         * In this case we copy bytes from the file to the {@code sink}.
         * <p>
         * <h3>The buffer</h3>
         * In this case the bytes are immediately copied into {@code sink} and the number of bytes
         * copied is returned.
         * <p>
         * <p>If upstream would be selected but another thread is already reading upstream this will
         * block until that read completes. It is possible to time out while waiting for that.
         */
        @Override
        public long read(Buffer sink, long byteCount) throws IOException {
            if (fileOperator == null) throw new IllegalStateException("closed");

            long upstreamPos;
            int source;

            selectSource:
            synchronized (Relay.this) {
                // We need new data from upstream.
                while (sourcePos == (upstreamPos = Relay.this.upstreamPos)) {
                    // No more data upstream. We're done.
                    if (complete) return -1L;

                    // Another thread is already reading. Wait for that.
                    if (upstreamReader != null) {
                        timeout.waitUntilNotified(Relay.this);
                        continue;
                    }

                    // We will do the read.
                    upstreamReader = Thread.currentThread();
                    source = SOURCE_UPSTREAM;
                    break selectSource;
                }

                long bufferPos = upstreamPos - buffer.size();

                // Bytes of the read precede the buffer. Read from the file.
                if (sourcePos < bufferPos) {
                    source = SOURCE_FILE;
                    break selectSource;
                }

                // The buffer has the data we need. Read from there and return immediately.
                long bytesToRead = Math.min(byteCount, upstreamPos - sourcePos);
                buffer.copyTo(sink, sourcePos - bufferPos, bytesToRead);
                sourcePos += bytesToRead;
                return bytesToRead;
            }

            // Read from the file.
            if (source == SOURCE_FILE) {
                long bytesToRead = Math.min(byteCount, upstreamPos - sourcePos);
                fileOperator.read(FILE_HEADER_SIZE + sourcePos, sink, bytesToRead);
                sourcePos += bytesToRead;
                return bytesToRead;
            }

            // Read from upstream. This always reads a full buffer: that might be more than what the
            // current call to Source.read() has requested.
            try {
                long upstreamBytesRead = upstream.read(upstreamBuffer, bufferMaxSize);

                // If we've exhausted upstream, we're done.
                if (upstreamBytesRead == -1L) {
                    commit(upstreamPos);
                    return -1L;
                }

                // Update this source and prepare this call's result.
                long bytesRead = Math.min(upstreamBytesRead, byteCount);
                upstreamBuffer.copyTo(sink, 0, bytesRead);
                sourcePos += bytesRead;

                // Append the upstream bytes to the file.
                fileOperator.write(
                        FILE_HEADER_SIZE + upstreamPos, upstreamBuffer.clone(), upstreamBytesRead);

                synchronized (Relay.this) {
                    // Append new upstream bytes into the buffer. Trim it to its max size.
                    buffer.write(upstreamBuffer, upstreamBytesRead);
                    if (buffer.size() > bufferMaxSize) {
                        buffer.skip(buffer.size() - bufferMaxSize);
                    }

                    // Now that the file and buffer have bytes, adjust upstreamPos.
                    Relay.this.upstreamPos += upstreamBytesRead;
                }

                return bytesRead;
            } finally {
                synchronized (Relay.this) {
                    upstreamReader = null;
                    Relay.this.notifyAll();
                }
            }
        }

        @Override
        public Timeout timeout() {
            return timeout;
        }

        @Override
        public void close() throws IOException {
            if (fileOperator == null) return; // Already closed.
            fileOperator = null;

            RandomAccessFile fileToClose = null;
            synchronized (Relay.this) {
                sourceCount--;
                if (sourceCount == 0) {
                    fileToClose = file;
                    file = null;
                }
            }

            if (fileToClose != null) {
                closeQuietly(fileToClose);
            }
        }
    }
}
